package cgl.narada.node;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.util.Properties;
import java.util.StringTokenizer;

import org.apache.log4j.Logger;

import cgl.narada.discovery.broker.messages.BrokerAdvertisement;
import cgl.narada.mgmt.common.CommandlineArguments;
import cgl.narada.mgmt.common.DefaultBrokerProperties;
import cgl.narada.mgmt.common.NetworkUtil;
import cgl.narada.mgmt.common.NodeAddressConvertor;
import cgl.narada.mgmt.schemas.bsa.BrokerInformationDocument;
import cgl.narada.mgmt.schemas.bsa.BrokerInformationDocument.BrokerInformation;
import cgl.narada.protocol.GatewayInfo;
import cgl.narada.protocol.NodeAddress;
import cgl.narada.protocol.ProtocolHandler;
import cgl.narada.transport.TransportException;
import cgl.narada.transport.ipsec.IPSecTunneler;
import cgl.narada.util.PropertiesHelp;
import cgl.narada.util.SystemInit;
import cgl.narada.util.SystemNotInitializedException;
import cgl.narada.util.UUIDRetriever;
import cgl.narada.util.Version;

/**
 * The Narada Broker instance. The properties for this broker are specified in
 * the configuration file.
 * 
 * @author Shrideep Pallickara
 * 
 * <p>
 * This interface has been modified to make the broker more manageable <br>
 * Modified on Dec 19, 2005 by
 * @author Harshawardhan Gadgil
 * 
 * $Revision$ $Date$
 */

public class BrokerNode implements BrokerSystemEventListener, NodeDebugFlags {

    static Logger log = Logger.getLogger("BrokerNode");

    /** The nodeAddress for this specific Broker Node */
    private NodeAddress thisNodeAddress;

    /**
     * Specifies the number of links allowed at each level. A 0 at level-i
     * indicates that this node can never be a level-i gateway
     */
    private short[] connectionVector = { 5, 4, 3, 0, 0 };

    /**
     * Use by the NodeAdditonProtocol to decide if a node should be allowed to
     * be part of its unit/super-unit.
     */
    private String ipDiscriminator;

    /** The number of levels used by the system */
    private short systemLevels;

    /** The protocol Handler */
    private ProtocolHandler protocolHandler;

    /**
     * Gateway Info : Maintains information about revelant gatekeepers within
     * the system
     */
    private GatewayInfo gatewayInfo;

    private boolean startedMonitoring = false;

    private Properties brokerProperties;

    private BrokerInformationDocument brokerInformationDocument;

    private BrokerInformation brokerInformation;

    private BrokerSystemEventGenerator evGen;

    // Added so that protocol handler can easily create BrokerAdvertisements
    public static String aboutThisBroker;

    public static String belongsToGroup;

    private static BrokerNode brokerNode = null;

    /**
     * @return Returns the brokerInformation.
     */
    public synchronized final BrokerInformationDocument getBrokerInformation() {
        return brokerInformationDocument;
    }

    public void handleEvent(Object arg0) {

        String linkID = (String) arg0;
        boolean found = false;

        BrokerInformationDocument.BrokerInformation.BrokerConnections[] bConns = brokerInformation
            .getBrokerConnectionsArray();
        int i = 0;

        for (i = 0; i < bConns.length; i++) {
            if (bConns[i].getLinkId().equals(linkID)) {
                found = true;
                break;
            }
        }

        if (found) {
            // This means that the connection was broken and not leagally closed
            // down...Only in such cases is an alarm raised
            brokerInformation.removeBrokerConnections(i);
            if (evGen != null)
                evGen.generateLinkLossEvent(linkID);
        } else {
            // This is because the link does not belong to me OR this link was
            // closed upon request.
        }
    }

    public synchronized final void setBrokerSystemEventGenerator(
                                                                 BrokerSystemEventGenerator evGen) {
        this.evGen = evGen;
    }

    public BrokerNode(Properties props) {

        brokerProperties = props;

        SystemInit.init();

        // For maintaining the BrokerInformation
        brokerInformationDocument = BrokerInformationDocument.Factory
            .newInstance();
        brokerInformation = brokerInformationDocument.addNewBrokerInformation();

        String nbHome = null;
        try {
            nbHome = SystemInit.getInitData(SystemInit.SYS_NB_HOME);
        } catch (SystemNotInitializedException e) {
            log.error("", e);
        }

        // Required for Broker Discovery Request Response Helper
        String serviceConfigFile = props.getProperty("serviceConfigFile");
        if (serviceConfigFile == null)
            serviceConfigFile = nbHome + "/config/ServiceConfiguration.txt";

        // Initialize the Broker
        initializeBroker(brokerProperties);
        protocolHandler.setBrokerSystemEventListener(this);
        protocolHandler.setServiceConfigurationLocation(serviceConfigFile);

        // Some constants related to this broker instance
        aboutThisBroker = brokerProperties.getProperty("AboutThisBroker");

        belongsToGroup = brokerProperties.getProperty("VirtualBrokerNetwork");
        if (belongsToGroup == null) {
            belongsToGroup = "*";
        }

        if (belongsToGroup.equals("*"))
            System.out.println("BelongsTO: PUBLIC Network");
        else
            System.out.println("BelongsTO: PRIVATE Network(" + belongsToGroup
                + ")");

        if (aboutThisBroker.startsWith("[")) {
            aboutThisBroker = aboutThisBroker.substring(1);
        }

        if (aboutThisBroker.endsWith("]")) {
            aboutThisBroker = aboutThisBroker.substring(0, aboutThisBroker
                .length() - 1);
        }

        // Set the Discovery Response Policy Verifier
        String discoveryResponsePolicyHandler = brokerProperties
            .getProperty("DiscoveryResponsePolicy");

        protocolHandler
            .setBrokerDiscoveryRequestResponsePolicyHandler(discoveryResponsePolicyHandler);

        // Initialize Broker Discovery Request forwarding service
        if (protocolHandler.isNodeAddressAssigned()) {
            protocolHandler.initDiscoveryRequestForwardingService();
        }

        // Initialize Broker Discovery Request FIFO table
        int maxDiscoveryRequestEntries = 1000;

        if (brokerProperties.containsKey("MAXBrokerDiscoRequests")) {
            String maxDiscoveryRequestValue = brokerProperties
                .getProperty("MAXBrokerDiscoRequests");
            maxDiscoveryRequestEntries = Integer
                .parseInt(maxDiscoveryRequestValue);
        }
        protocolHandler.initDiscoveryRequestTable(maxDiscoveryRequestEntries);

        // Register with BrokerDiscovery Node
        tryAdvertisingBrokerAtBDN();

        // ===== !
    }

    private void initializeBroker(Properties brokerProperties) {
        String banner = "\n \t \t The NaradaBrokering System\n \t "
            + Version.getVersion()
            + "\n\tCommunity Grids Lab - Indiana University \n";
        System.out.println(banner);

        StringBuffer info = new StringBuffer();

        // int[] address = new int[systemLevels + 1];

        ipDiscriminator = brokerProperties.getProperty("Discriminator");

        /* Verify if address is automatically assigned or needs to be generated */
        boolean assignedAddress = true;

        if (brokerProperties.containsKey("AssignedAddress")) {
            if (brokerProperties.getProperty("AssignedAddress").equals("false"))
                assignedAddress = false;
        }

        // Fill BrokerInformation
        brokerInformation.setIpAddress(NetworkUtil.getHostIPAddress());
        brokerInformation.setNodeAddressAssigned(assignedAddress);
        brokerInformation.setGatewayAddressAssigned(false);

        // HG: Changed for a static node address
        int[] address = null;
        if (assignedAddress) {
            if (brokerProperties.containsKey("NodeAddress")) {
                address = NodeAddressConvertor.toIntArray(brokerProperties
                    .getProperty("NodeAddress"));
                systemLevels = (short) (address.length - 1);
            } else {
                log.fatal("Unable to convert Node address: >"
                    + brokerProperties.getProperty("NodeAddress") + "<");
                System.exit(0);
            }
        } else {
            // /** An address of 0.0.0.0 indicates that this node needs an
            // address
            // */
            // int addressId = 0;
            // if (assignedAddress) addressId = 1;
            // for (int i = 0; i < systemLevels + 1; i++)
            // address[i] = addressId;

            address = NodeAddressConvertor.toIntArray("0,0,0,0");
            systemLevels = (short) (address.length - 1);
        }

        int concurrentConnections = 3000;
        if (brokerProperties.containsKey("ConcurrentConnectionLimit")) {
            concurrentConnections = Integer.parseInt(brokerProperties
                .getProperty("ConcurrentConnectionLimit"));
        }

        gatewayInfo = new GatewayInfo(systemLevels);
        thisNodeAddress = new NodeAddress(address);

        protocolHandler = new ProtocolHandler(thisNodeAddress,
                                              connectionVector,
                                              concurrentConnections,
                                              ipDiscriminator, gatewayInfo);

        // HG: Need to initialize this ASAP, Hence moved here ...
        String keyStoreFileName = brokerProperties
            .getProperty("BrokerKeyStore");

        String nbHome = null;
        try {
            nbHome = SystemInit.getInitData(SystemInit.SYS_NB_HOME);
        } catch (SystemNotInitializedException e) {
            e.printStackTrace();
        }

        keyStoreFileName = nbHome + "/" + keyStoreFileName;

        System.out.println("Using BrokerKeyStore: " + keyStoreFileName);
        protocolHandler.initSecureTopicsManager(keyStoreFileName);

        protocolHandler.initializeProtocolSuite();
        protocolHandler.start();

        if (assignedAddress) {
            protocolHandler.setAssignedAddress(thisNodeAddress);
        }
        gatewayInfo.setProtocolHandler(protocolHandler);

        /** Retrieve the ports on which the broker listens to connections */
        int niotcpPort = 0;

        if (brokerProperties.containsKey("NIOTCPBrokerPort")) {
            niotcpPort = Integer.parseInt(brokerProperties
                .getProperty("NIOTCPBrokerPort"));
            if (niotcpPort != 0) {
                niotcpPort = setUpNode(niotcpPort, "niotcp");
                brokerProperties.setProperty("NIOTCPBrokerPort", ""
                    + niotcpPort);
                if (niotcpPort > 0)
                    log
                        .info("Support for Non-Blocking IO available on TCP port ["
                            + niotcpPort + "]");
            }
        }

        int tcpPort = 0;
        if (brokerProperties.containsKey("TCPBrokerPort")) {
            tcpPort = Integer.parseInt(brokerProperties
                .getProperty("TCPBrokerPort"));
            if (tcpPort != 0) {
                tcpPort = setUpNode(tcpPort, "tcp");
                brokerProperties.setProperty("TCPBrokerPort", "" + tcpPort);
                if (tcpPort > 0)
                    log.info("Blocking IO support is available on TCP port ["
                        + tcpPort + "]");
            }
        }

        int ptcpPort = 0;
        if (brokerProperties.containsKey("PTCPBrokerPort")) {
            ptcpPort = Integer.parseInt(brokerProperties
                .getProperty("PTCPBrokerPort"));
            if (ptcpPort != 0) {
                ptcpPort = setUpNode(ptcpPort, "ptcp");
                brokerProperties.setProperty("PTCPBrokerPort", "" + ptcpPort);
            }
        }

        int udpPort = 0;
        if (brokerProperties.containsKey("UDPBrokerPort")) {
            udpPort = Integer.parseInt(brokerProperties
                .getProperty("UDPBrokerPort"));
            if (udpPort != 0) {
                udpPort = setUpNode(udpPort, "udp");
                brokerProperties.setProperty("UDPBrokerPort", "" + udpPort);
                if (udpPort > 0)
                    log.info("Listening to datagram packets on UDP port ["
                        + udpPort + "]");
            }
        }

        int multicastPort = 0;
        if (brokerProperties.containsKey("MulticastGroupPort")) {
            multicastPort = Integer.parseInt(brokerProperties
                .getProperty("MulticastGroupPort"));
            if (multicastPort != 0) {
                multicastPort = setUpNode(multicastPort, "multicast");
                brokerProperties.setProperty("MulticastGroupPort", ""
                    + multicastPort);
            }
        }

        int pooltcpPort = 0;
        if (brokerProperties.containsKey("PoolTCPBrokerPort")) {
            pooltcpPort = Integer.parseInt(brokerProperties
                .getProperty("PoolTCPBrokerPort"));
            if (pooltcpPort != 0) {
                pooltcpPort = setUpNode(pooltcpPort, "pooltcp");
                brokerProperties.setProperty("PoolTCPBrokerPort", ""
                    + pooltcpPort);
            }
        }

        int httpPort = 0;
        if (brokerProperties.containsKey("HTTPBrokerPort")) {
            httpPort = Integer.parseInt(brokerProperties
                .getProperty("HTTPBrokerPort"));
            if (httpPort != 0) {
                httpPort = setUpNode(httpPort, "http");
                brokerProperties.setProperty("HTTPBrokerPort", "" + httpPort);
            }
        }

        int httpsPort = 0;
        if (brokerProperties.containsKey("HTTPSBrokerPort")) {
            httpsPort = Integer.parseInt(brokerProperties
                .getProperty("HTTPSBrokerPort"));
            if (httpsPort != 0) {
                httpsPort = setUpNode(httpsPort, "https");
                brokerProperties.setProperty("HTTPSBrokerPort", "" + httpsPort);
            }
        }

        int sslPort = 0;
        if (brokerProperties.containsKey("SSLBrokerPort")) {
            sslPort = Integer.parseInt(brokerProperties
                .getProperty("SSLBrokerPort"));
            if (sslPort != 0) {
                sslPort = setUpNode(sslPort, "ssl");
                brokerProperties.setProperty("SSLBrokerPort", "" + sslPort);
            }
        }

        int up2pPort = 0;
        if (brokerProperties.containsKey("UP2PBrokerPort")) {
            up2pPort = Integer.parseInt(brokerProperties
                .getProperty("UP2PBrokerPort"));
            if (up2pPort != 0) {
                up2pPort = setUpNode(up2pPort, "up2p");
                brokerProperties.setProperty("UP2PBrokerPort", "" + up2pPort);
            }
        }

        log.info("Limit on concurrent connections is " + concurrentConnections);

        if (assignedAddress) {
            setupRTPLinkFactory();
        }
    }

    private void setupRTPLinkFactory() {
        String rtpInit = "NO";

        if (brokerProperties.containsKey("SupportRTP")) {
            rtpInit = brokerProperties.getProperty("SupportRTP");
        }

        if (rtpInit.equals("yes")) {
            int counter = 0;
            boolean assigned = false;
            do {
                assigned = protocolHandler.isNodeAddressAssigned();
                if (!assigned) {
                    try {
                        Thread.sleep(200);
                    } catch (Exception e) {
                    }
                }

            } while (!assigned && counter < 10);

            // load rtp link factory
            try {
                Properties props = new Properties(); // nothing added
                protocolHandler.loadCommunicationsOfType(props, "rtp");
            } catch (TransportException transEx) {
                System.out.println(transEx);
            }
        }
    }

    private int setUpNode(int portNum, String transportType) {
        Properties props = new Properties();

        if (transportType.equals("niotcp")) {
            props.put("NIOTCPServerPort", Integer.toString(portNum));
        }

        else if (transportType.equals("tcp")) {
            props.put("TCPServerPort", Integer.toString(portNum));

        }

        else if (transportType.equals("ptcp")) {
            props.put("PTCPServerPort", Integer.toString(portNum));
            int ptcpStreamNumber = Integer.parseInt(brokerProperties
                .getProperty("PTCPStreamNumber"));
            props.put("PTCPStreamNumber", Integer.toString(ptcpStreamNumber));
        }

        else if (transportType.equals("udp")) {
            props.put("UDPListenerPort", Integer.toString(portNum));
        }

        else if (transportType.equals("multicast")) {
            String groupHost = brokerProperties
                .getProperty("MulticastGroupHost");
            if (groupHost == null) {
                log.info("MulticastGroup specified is null! Will not "
                    + "start multicast communication services!");
                return -1;
            }
            props.put("MulticastGroupHost", groupHost);
            props.put("MulticastGroupPort", Integer.toString(portNum));
        }

        else if (transportType.equals("pooltcp")) {
            props.put("PoolTCPServerPort", Integer.toString(portNum));

        }

        else if (transportType.equals("http")) {
            props.put("acceptor.http.port", Integer.toString(portNum));
        }

        else if (transportType.equals("https")) {
            props.put("listenerport", Integer.toString(portNum));
        }

        else if (transportType.equals("ssl")) {
            props.put("acceptor.ssl.port", Integer.toString(portNum));

            if (brokerProperties.containsKey("ControlServerPort")) {
                props.put("controlServer.port", brokerProperties
                    .getProperty("ControlServerPort"));
            } else {
                // Try default properties
                // Assume that SystemInit was done at this point
                try {
                    props.put("acceptor.keyStore", SystemInit
                        .getInitData(SystemInit.SYS_BROKER_KEYSTORE));
                    props.put("acceptor.keyStorePassword", SystemInit
                        .getInitData(SystemInit.SYS_BROKER_KEYSTORE_PASSWORD));
                    props
                        .put(
                             "acceptor.trustStore",
                             SystemInit
                                 .getInitData(SystemInit.SYS_BROKER_CERTIFICATE_TRUSTRORE));
                    props
                        .put(
                             "acceptor.trustStorePassword",
                             SystemInit
                                 .getInitData(SystemInit.SYS_BROKER_TRUSTSTORE_PASSWORD));
                } catch (SystemNotInitializedException e) {
                    e.printStackTrace();
                }
            }
        }

        else if (transportType.equals("up2p")) {
            props.put("UP2PListenerPort", Integer.toString(portNum));

            // Some more up2p specific properties...
            props.put("PeerID", brokerProperties.getProperty("PeerID"));
            props.put("RelayServerHost", brokerProperties
                .getProperty("RelayServerHost"));
            props.put("RelayServerPort", brokerProperties
                .getProperty("RelayServerPort"));
        }

        try {
            protocolHandler.loadCommunicationsOfType(props, transportType);
        } catch (TransportException e) {
            log.error("", e);
            return -1;
        }
        return portNum;
    }

    /**
     * Broker Discovery Specific Advertisement request generator
     */
    private void tryAdvertisingBrokerAtBDN() {

        BrokerAdvertisement ad = new BrokerAdvertisement();

        ad.setHostname(NetworkUtil.getHostIPAddress());
        System.out.println("Setting HOST: " + ad.getHostname());
        ad.setSupportedTransportProtocols(protocolHandler
            .getSupportedTransportProtocols());
        ad.setInfo(aboutThisBroker);
        ad.setBelongsToNetwork(belongsToGroup);

        if (!brokerProperties.containsKey("BDNList")) {
            log.info("No BDNs have been specified. "
                + "NO Broker registerations will be performed");
            return;
        }

        String bdnList = brokerProperties.getProperty("BDNList");

        if (bdnList.startsWith("[")) {
            bdnList = bdnList.substring(1);
        }

        if (bdnList.endsWith("]")) {
            bdnList = bdnList.substring(0, bdnList.length() - 1);
        }

        StringTokenizer st = new StringTokenizer(bdnList, ",");

        byte[] badBytes = ad.getBytes();

        while (st.hasMoreTokens()) {
            String BDNUrl = st.nextToken();

            System.out.println("Trying: " + BDNUrl);

            try {
                URL url = new URL(BDNUrl);
                HttpURLConnection urlConnection = (HttpURLConnection) url
                    .openConnection();

                urlConnection.setRequestMethod("POST");
                urlConnection.setDoInput(true);
                urlConnection.setDoOutput(true);
                urlConnection.setUseCaches(false);

                urlConnection.setRequestProperty("CONTENT_LENGTH", ""
                    + badBytes.length);
                OutputStream os = urlConnection.getOutputStream();
                os.write(badBytes);
                os.flush();
                os.close();

                // Now read the UUID associated with this broker
                BufferedReader br = new BufferedReader(
                                                       new InputStreamReader(
                                                                             urlConnection
                                                                                 .getInputStream()));
                String line;
                while ((line = br.readLine()) != null) {
                    System.out.println("BDN Sends: " + line);
                }
            } catch (Exception e) {
                System.out.println("Unable to publish <" + BDNUrl + "> :" + e);
            }
        }
    }

    /**
     * Creates a link to the specified broker. Helps to generate a broker
     * network topology. <br>
     * 
     * @param host
     *            The host on which the target boker resides
     * @param portNum
     *            The port number to which a connection is requested
     * @param medium
     *            The transport protocol to be used. Valid values are
     *            <ul>
     *            <li>'t' - tcp</li>
     *            <li>'p' - ptcp</li>
     *            <li>'u' - udp</li>
     *            <li>'nio' - niotcp</li>
     *            <li>'http' or 'ssl' &lt;- Requires to set the 4th parameter
     *            confFileName</li>
     *            </ul>
     * @param opt_confFileName
     *            May be blank unless the medium is http or ssl
     * @return A String representing the linkId of the created link if
     *         successful, NULL otherwise
     */
    public String connectTo(String host, String portNum, String medium,
                            int level) {

        log.info("Connecting to: \n\tHOST: " + host + "\n\tPORT: " + portNum
            + "\n\tMEDIUM: " + medium);

        Properties props = new Properties();

        props.put("hostname", host);
        props.put("portnum", portNum);

        String transport = "niotcp";

        if (medium.equals("t") || medium.equals("tcp"))
            transport = "tcp";

        else if (medium.equals("p") || medium.equals("ptcp"))
            transport = "ptcp";

        else if (medium.equals("u") || medium.equals("udp"))
            transport = "udp";

        else if (medium.equals("nio") || medium.equals("niotcp"))
            transport = "niotcp";

        else if (medium.equals("http") || medium.equals("https")
            || medium.equals("ssl")) {
            // PropertiesHelp help = null;
            // transport = "http";
            // help = new PropertiesHelp(opt_confFileName, transport);
            // help.addProperties(props);
            props.put("username", brokerProperties.getProperty("username"));
            props.put("password", brokerProperties.getProperty("password"));
        }

        else if (medium.equals("up2p"))
            transport = "up2p";

        else {
            log.error("Unknown connection type {" + medium + "} requested");
            return null;
        }

        try {
            String linkId = protocolHandler.setupLink(props, transport);

            // Also save the brokerConnection information
            BrokerInformationDocument.BrokerInformation.BrokerConnections connInfo = brokerInformation
                .addNewBrokerConnections();
            connInfo.setDestination(host);
            connInfo.setPort(Integer.parseInt(portNum));
            connInfo.setProtocol(transport);
            connInfo.setLinkId(linkId);

            requestGatewayAddress(linkId, level); // Hard coded for now

            return linkId;
        } catch (TransportException e) {
            log.error("" + e);
            return null;
        }
    }

    /**
     * Older version of connectTo Function
     * 
     * @param props
     * @param transport
     * @return
     * @throws TransportException
     */
    public String connectTo(Properties props, String transport)
        throws TransportException {
        String linkId = protocolHandler.setupLink(props, transport);
        return linkId;
    }

    /**
     * Tell the broker to request a node address from the specified client
     * 
     * @param client -
     *            The link id of the connection on which the request is to be
     *            sent
     * @param s_level -
     *            address level
     * @return - True if the Request was carried out successfully
     */
    public boolean requestNodeAddress(String client, String s_level) {

        // Skip, if a node address was already assigned
        if (brokerInformation.getNodeAddressAssigned())
            return true;

        int level = Integer.parseInt(s_level);

        int[] levels = { 0, level };
        byte[] request = protocolHandler.constructNodeAdditionRequest(levels,
                                                                      true);
        if (request == null)
            return false;
        protocolHandler.sendTo(client, request);

        brokerInformation.setNodeAddressAssigned(true);

        return true;
    }

    /**
     * Tell the broker to request a gateway address from the specified broker
     * 
     * @param client -
     *            The link id of the connection on which the request is to be
     *            sent
     * @param s_level -
     *            address level
     * @return - True if the Request was carried out successfully
     */
    public boolean requestGatewayAddress(String client, int gatewayLevel) {

        // Skip, if a gateway address was already assigned
        if (brokerInformation.getGatewayAddressAssigned())
            return true;

        // int gatewayLevel = Integer.parseInt(s_level);
        byte[] request = protocolHandler
            .constructGatewaySetupRequest(client, gatewayLevel);
        if (request == null)
            return false;
        protocolHandler.sendTo(client, request);

        return true;
    }

    /**
     * Get all the links on this broker Node
     * 
     * @return An Array of Strings containing the names of various links on this
     *         broker
     */
    public String[] getLinks() {
        return protocolHandler.enumerateLinks();
    }

    /**
     * Delete the link specified by the linkID parameter. If a link does not
     * exist, Nothing happens.
     * 
     * @param linkID -
     *            Specifies the link to delete
     */
    public void closeLink(String linkID) {
        String[] links = protocolHandler.enumerateLinks();

        // System.out.println("B4 CLOSING, LINKS FROM THIS NODE: " +
        // links.length);
        // for (int i = 0; i < links.length; i++) {
        // System.out.println("LINK: " + links[i]);
        // }
        // System.out.println("=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=");

        System.out.println("CLOSING LINK: " + linkID);
        boolean found = false;

        // Make sure this happens first !
        BrokerInformationDocument.BrokerInformation.BrokerConnections[] bConns = brokerInformation
            .getBrokerConnectionsArray();
        for (int i = 0; i < bConns.length; i++) {
            if (bConns[i].getLinkId().equals(linkID)) {
                found = true;
                brokerInformation.removeBrokerConnections(i);
                break;
            }
        }

        if (found) { // Now close the link
            protocolHandler.closeLink(linkID);
        }

        // links = protocolHandler.enumerateLinks();
        //
        // System.out.println("REMAINING LINKS FROM THIS NODE: " +
        // links.length);
        // for (int i = 0; i < links.length; i++) {
        // System.out.println("LINK: " + links[i]);
        // }
        // System.out.println("=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=");
    }

    /**
     * Shuts down the broker and cleans up all resources
     */
    public boolean shutdown() {
        brokerInformationDocument = null;
        protocolHandler.shutdown();
        return true;
    }

    public void sendTo(String client, byte[] data) {
        protocolHandler.sendTo(client, data);
    }

    public void startMonitoringService() throws TransportException {
        if (!startedMonitoring) {
            protocolHandler.startMonitoringService();
            startedMonitoring = true;
        }
        protocolHandler.enableMonitoring();
    }

    public void enableMonitoringService() throws TransportException {
        startMonitoringService();
        protocolHandler.enableMonitoring();

    }

    public void disableMonitoringService() {
        protocolHandler.disableMonitoring();
    }

    /**
     * @return Returns the brokerProperties.
     */
    public synchronized final Properties getBrokerProperties() {
        return brokerProperties;
    }

    public static void main(String[] args) {
        CommandlineArguments cmdLineArgs = new CommandlineArguments(args);

        String brokerConfigFileName = cmdLineArgs
            .getStringProperty("brokerConfig", null);
        String serviceConfigFile = cmdLineArgs
            .getStringProperty("serviceConfig", null);

        // We don't pass this argument if we don't want this to start
        int brokerCommunicatorPort = cmdLineArgs
            .getIntProperty("brokerCommunicatorPort", 0);

        String brokerCommunicatorFile = cmdLineArgs
            .getStringProperty("brokerCommunicatorFile", null);
        
        boolean useDefaultProperties = cmdLineArgs
            .getBooleanProperty("useDefaultBrokerProperties", false);

        Properties props;

        if (useDefaultProperties) {
            props = DefaultBrokerProperties.getBROKER_CONFIGURATION();
        } else {
            if ((brokerConfigFileName == null) || (serviceConfigFile == null)) {
                System.out.println("Usage: java cgl.narada.node.BrokerNode "
                    + "--brokerConfig=brokerConfigFilename\n"
                    + "--serviceConfig=serviceConfigFilename\n"
                    + "[--brokerCommunicatorPort=portNum]\n"
                    + "[--brokerCommunicatorFile=uuidFile"
                    + "[--useDefaultBrokerProperties]");
                System.exit(0);
            }

            props = new Properties();
            try {
                props.load(new FileInputStream(brokerConfigFileName));
            } catch (IOException e1) {
                log.error("", e1);
            }

            // Set Service Configuration Location if NON-NULL
            if (serviceConfigFile != null)
                props.put("serviceConfigFile", serviceConfigFile);
        }

        brokerNode = new BrokerNode(props);

        if (brokerCommunicatorPort > 0) {
            try {
                brokerNode.frontProcess(brokerCommunicatorPort,
                                        brokerCommunicatorFile);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    } /* end main() */

    // -------------------------
    // Broker Front Process Code
    // -------------------------

    private static Thread currentFront = null;

    // NOTE: This CODE pose a problem if multiple brokers are run from sample
    // folder since the uuid file will get overwritten
    private String uuidFile = "../config/uuid.txt";

    private void frontProcess(int brokerCommunicatorPort,
                              String brokerCommunicatorFile) throws Exception {
        uuidFile = brokerCommunicatorFile;
        ServerSocket serverSock = null;
        Socket sock = null;
        serverSock = new ServerSocket(brokerCommunicatorPort);
        while (true) {
            if (serverSock != null)
                sock = serverSock.accept();
            if (sock == null)
                continue;
            String sockHost = sock.getInetAddress().getHostAddress();
            String hostName = InetAddress.getLocalHost().getHostAddress();
            /* check if it's from localhost */
            if (!sockHost.equals(hostName) && !sockHost.equals("127.0.0.1")) {
                System.out
                    .println("Warning: This host is connected to BrokerFront: "
                        + sockHost);
                sock.close();
                continue;
            }

            BufferedReader sockReader = new BufferedReader(
                                                           new InputStreamReader(
                                                                                 sock
                                                                                     .getInputStream()));

            /* check if it's the first time ever front comm. */
            if (currentFront == null) {
                BrokerProcess bp = new BrokerProcess(sock, sockReader);
                currentFront = bp;
                File uuid = new File(uuidFile);
                PrintWriter pw = new PrintWriter(new FileWriter(uuid));
                String id = UUIDRetriever.getInstance()
                    .getRandomBasedUUIDAsString();
                pw.write(id);
                pw.close();
                bp.start();
            } else { /* check if it has sent a correct uuid */
                String uuid = sockReader.readLine();
                BufferedReader freader = new BufferedReader(
                                                            new FileReader(
                                                                           uuidFile));
                String stored = freader.readLine();
                freader.close();
                if (uuid != null && uuid.equals(stored)) {
                    BrokerProcess bp = new BrokerProcess(sock, sockReader);
                    currentFront.interrupt();
                    currentFront = bp;
                    bp.start();
                } else {
                    sockReader.close();
                    sock.close();
                }
            }
        }
    }

    public static class BrokerProcess extends Thread {

        // private BrokerNode brokerNode;

        private Properties props = new Properties();

        private Socket sock;

        private BufferedReader sockReader;

        private PrintWriter sockWriter;

        public BrokerProcess(Socket sock, BufferedReader reader) {
            this.sock = sock;
            this.sockReader = reader;
            try {
                this.sockWriter = new PrintWriter(sock.getOutputStream(), true);
            } catch (IOException ioe) {
                ioe.printStackTrace();
            }
        }

        public void run() {
            String cmd;
            StringTokenizer tok;
            sockWriter.println("\n" + "Type h for help and Usage indicators"
                + "\n");
            while (true) {
                try {
                    cmd = sockReader.readLine();
                    if (cmd == null) {
                        break;
                    }
                    if (cmd.startsWith("h")) {
                        sockWriter
                            .println("Connect              -> c broker portNum t/u/nio/http [file]");
                        sockWriter
                            .println("Node Address Request -> na broker:portNum "
                                + "levels");
                        sockWriter
                            .println("Create Gateway       -> broker:portNum level");
                        sockWriter.println("Enable Monitoring    -> em");
                        sockWriter.println("Disable Monitoring   -> dm");

                        String ipsecInfo = "SETUP IPSec Tunnel   -> "
                            + "cr_ipsec_tunnel <hostname> <policy_name> "
                            + "<rule_name> <secret>\n"
                            + "TEARDOWN IPSec Tunnel-> "
                            + "del_ipsec_tunnel <hostname>\n"
                            + "Dispose IPSec Tunnels-> "
                            + "disp_ipsec_tunnels\n";
                        sockWriter.println(ipsecInfo);

                    }

                    if (cmd.startsWith("c ")) {
                        tok = new StringTokenizer(cmd, " ");
                        if (tok.countTokens() > 5 || tok.countTokens() < 4) {
                            sockWriter
                                .println("Usage -> c broker portNum t/u/nio/http [file] level");
                            continue;
                        }
                        tok.nextToken();
                        String host = tok.nextToken();
                        int port = Integer.parseInt(tok.nextToken());
                        props.put("hostname", host);
                        props.put("portnum", Integer.toString(port));
                        String transport = "niotcp";
                        String medium = tok.nextToken();
                        if (medium.equals("t"))
                            transport = "tcp";
                        else if (medium.equals("p")) {
                            transport = "ptcp";
                        }

                        else if (medium.equals("u"))
                            transport = "udp";
                        else if (medium.equals("nio"))
                            transport = "niotcp";
                        else if (medium.equals("http") || medium.equals("ssl")) {
                            PropertiesHelp help = null;
                            transport = "http";
                            if (tok.hasMoreTokens()) {
                                String configFile = tok.nextToken();
                                help = new PropertiesHelp(configFile, transport);
                                help.addProperties(props);
                            }
                        }

                        brokerNode.connectTo(props, transport);
                    }
                    if (cmd.startsWith("na")) {
                        tok = new StringTokenizer(cmd, " ");
                        if (tok.countTokens() != 3) {
                            sockWriter
                                .println("Usage -> na broker:portNum levels");
                            continue;
                        }
                        tok.nextToken();
                        String client = tok.nextToken();
                        String levels = tok.nextToken();

                        // HG: NOTE:- Function Name/signature change
                        brokerNode.requestNodeAddress(client, levels);

                        brokerNode.setupRTPLinkFactory();
                    }

                    if (cmd.startsWith("ga")) {
                        tok = new StringTokenizer(cmd, " ");
                        if (tok.countTokens() != 3) {
                            sockWriter
                                .println("Usage -> ga broker:portNum level");
                            continue;
                        }
                        tok.nextToken();
                        String client = tok.nextToken();
                        int level = Integer.parseInt(tok.nextToken());

                        // HG: NOTE:- Function Name change
                        brokerNode.requestGatewayAddress(client, level);
                    }

                    if (cmd.startsWith("send")) {
                        tok = new StringTokenizer(cmd, " ");
                        tok.nextToken();
                        String client = tok.nextToken();
                        byte[] data = tok.nextToken().getBytes();
                        brokerNode.sendTo(client, data);
                    }

                    if (cmd.startsWith("em")) {
                        brokerNode.enableMonitoringService();
                    }

                    if (cmd.startsWith("dm")) {
                        brokerNode.disableMonitoringService();
                    }

                    /** * Related to setting up IPSec Tunnel */
                    if (cmd.startsWith("cr_ipsec_tunnel")) {
                        tok = new StringTokenizer(cmd, " ");
                        if (tok.countTokens() != 5) {
                            String errorReport = "Please make sure that you"
                                + "specify all arguments. The correct command "
                                + "for setting up the IPSec Tunnel is:\n\n\t"
                                + "cr_ipsec_tunnel <hostname> <policy_name> "
                                + "<rule_name> <secret>" + "\n";
                            System.out.println(errorReport);
                            continue;
                        }
                        Properties props = new Properties();
                        tok.nextToken();
                        props.put("IPSecHostname", tok.nextToken());
                        props.put("IPSecPolicyName", tok.nextToken());
                        props.put("IPSecRuleName", tok.nextToken());
                        props.put("IPSecSharedSecret", tok.nextToken());
                        try {
                            IPSecTunneler tunneler = IPSecTunneler
                                .getInstance();
                            tunneler.createTunnel(props);
                        } catch (TransportException e) {
                            System.out.println(e);
                        }
                        continue;
                    }
                    /** end creation of tunnel */

                    if (cmd.startsWith("del_ipsec_tunnel")) {
                        tok = new StringTokenizer(cmd, " ");
                        if (tok.countTokens() != 2) {
                            String errorReport = "Please make sure that you"
                                + "specify all arguments. The correct command "
                                + "for tearing down the IPSec Tunnel is:\n\n\t"
                                + "del_ipsec_tunnel <hostname> \n";
                            System.out.println(errorReport);
                            continue;
                        }
                        tok.nextToken();
                        try {
                            IPSecTunneler tunneler = IPSecTunneler
                                .getInstance();
                            tunneler.deleteTunnel(tok.nextToken());
                        } catch (TransportException e) {
                            System.out.println(e);
                        }
                        continue;
                    } /* end teardown of the IPSec tunnels */

                    if (cmd.equals("dispose_ipsec_tunnels")) {
                        IPSecTunneler tunneler = IPSecTunneler.getInstance();
                        tunneler.disposeAllTunnels();
                        continue;
                    }

                    /** End setting up IPSec Tunnel */

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            /* if front socket closed */
            while (true) {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException ie) {
                    // ie.printStackTrace();
                    continue;
                }
            }
        }
    }
}
